a tool for shared writing and social publishing
1import { mutations } from "src/replicache/mutations";
2import { eq, sql } from "drizzle-orm";
3import { permission_token_rights, replicache_clients } from "drizzle/schema";
4import { getClientGroup } from "src/replicache/utils";
5import { makeRoute } from "../lib";
6import { z } from "zod";
7import type { Env } from "./route";
8import { cachedServerMutationContext } from "src/replicache/cachedServerMutationContext";
9import { drizzle } from "drizzle-orm/node-postgres";
10import { pool } from "supabase/pool";
11
12const mutationV0Schema = z.object({
13 id: z.number(),
14 name: z.string(),
15 args: z.unknown(),
16 timestamp: z.number(),
17});
18
19const mutationV1Schema = mutationV0Schema.extend({
20 clientID: z.string(),
21});
22
23const pushRequestV0Schema = z.object({
24 pushVersion: z.literal(0),
25 schemaVersion: z.string(),
26 profileID: z.string(),
27 clientID: z.string(),
28 mutations: z.array(mutationV0Schema),
29});
30
31const pushRequestV1Schema = z.object({
32 pushVersion: z.literal(1),
33 schemaVersion: z.string(),
34 profileID: z.string(),
35 clientGroupID: z.string(),
36 mutations: z.array(mutationV1Schema),
37});
38
39// Combine both versions into final PushRequest schema
40const pushRequestSchema = z.discriminatedUnion("pushVersion", [
41 pushRequestV0Schema,
42 pushRequestV1Schema,
43]);
44
45type PushRequestZ = z.infer<typeof pushRequestSchema>;
46
47export const push = makeRoute({
48 route: "push",
49 input: z.object({
50 pushRequest: pushRequestSchema,
51 rootEntity: z.string(),
52 token: z.object({ id: z.string() }),
53 }),
54 handler: async ({ pushRequest, rootEntity, token }, { supabase }: Env) => {
55 if (pushRequest.pushVersion !== 1) {
56 return {
57 result: { error: "VersionNotSupported", versionType: "push" } as const,
58 };
59 }
60 let timeWaitingForLock: number;
61 let timeWaitingForDbConnection: number;
62 let timeProcessingMutations: number = 0;
63 let timeGettingClientGroup: number = 0;
64 let timeGettingTokenRights: number = 0;
65 let timeFlushingContext: number = 0;
66 let timeUpdatingLastMutations: number = 0;
67 let mutationTimings: Array<{
68 name: string;
69 duration: number;
70 }> = [];
71
72 let start = performance.now();
73 let client = await pool.connect();
74 timeWaitingForDbConnection = performance.now() - start;
75 start = performance.now();
76 const db = drizzle(client);
77 let channel = supabase.channel(`rootEntity:${rootEntity}`);
78 timeWaitingForLock = performance.now() - start;
79 start = performance.now();
80 try {
81 await db.transaction(async (tx) => {
82 const tokenHash = token.id.split("").reduce((acc, char) => {
83 return ((acc << 5) - acc + char.charCodeAt(0)) | 0;
84 }, 0);
85
86 await tx.execute(sql`SELECT pg_advisory_xact_lock(${tokenHash})`);
87
88 let clientGroupStart = performance.now();
89 let clientGroup = await getClientGroup(tx, pushRequest.clientGroupID);
90 timeGettingClientGroup = performance.now() - clientGroupStart;
91
92 let tokenRightsStart = performance.now();
93 let token_rights = await tx
94 .select()
95 .from(permission_token_rights)
96 .where(eq(permission_token_rights.token, token.id));
97 timeGettingTokenRights = performance.now() - tokenRightsStart;
98 let { getContext, flush } = cachedServerMutationContext(
99 tx,
100 token.id,
101 token_rights,
102 );
103
104 let lastMutations = new Map<string, number>();
105 console.log(`Processing mutations on ${token.id}`);
106 console.log(
107 `Processing ${pushRequest.mutations.length} mutations:`,
108 pushRequest.mutations.map((m) => m.name),
109 );
110
111 for (let mutation of pushRequest.mutations) {
112 let lastMutationID = clientGroup[mutation.clientID] || 0;
113 if (mutation.id <= lastMutationID) continue;
114
115 clientGroup[mutation.clientID] = mutation.id;
116 let name = mutation.name as keyof typeof mutations;
117 if (!mutations[name]) {
118 continue;
119 }
120
121 let mutationStart = performance.now();
122 try {
123 let ctx = getContext(mutation.clientID, mutation.id);
124 await mutations[name](mutation.args as any, ctx);
125 let mutationDuration = performance.now() - mutationStart;
126 mutationTimings.push({
127 name: mutation.name,
128 duration: mutationDuration,
129 });
130 } catch (e) {
131 let mutationDuration = performance.now() - mutationStart;
132 mutationTimings.push({
133 name: mutation.name,
134 duration: mutationDuration,
135 });
136 console.log(
137 `Error occurred while running mutation: ${name} after ${mutationDuration.toFixed(2)}ms`,
138 JSON.stringify(e),
139 JSON.stringify(mutation, null, 2),
140 );
141 }
142 lastMutations.set(mutation.clientID, mutation.id);
143 }
144
145 let dbUpdateStart = performance.now();
146 let lastMutationIdsUpdate = Array.from(lastMutations.entries()).map(
147 (entries) => ({
148 client_group: pushRequest.clientGroupID,
149 client_id: entries[0],
150 last_mutation: entries[1],
151 }),
152 );
153 console.log("lastMutationIdsUpdate:", lastMutationIdsUpdate);
154 if (lastMutationIdsUpdate.length > 0)
155 await tx
156 .insert(replicache_clients)
157 .values(lastMutationIdsUpdate)
158 .onConflictDoUpdate({
159 target: replicache_clients.client_id,
160 set: { last_mutation: sql`excluded.last_mutation` },
161 });
162 timeUpdatingLastMutations = performance.now() - dbUpdateStart;
163
164 let flushStart = performance.now();
165 await flush();
166 timeFlushingContext = performance.now() - flushStart;
167 });
168 timeProcessingMutations = performance.now() - start;
169
170 await channel.send({
171 type: "broadcast",
172 event: "poke",
173 payload: { message: "poke" },
174 });
175 } catch (e) {
176 timeProcessingMutations = performance.now() - start;
177 console.log(e);
178 } finally {
179 // Calculate mutation statistics
180 let totalMutationTime = mutationTimings.reduce(
181 (sum, m) => sum + m.duration,
182 0,
183 );
184
185 console.log(`
186Push Request Performance Summary (${timeProcessingMutations.toFixed(2)}ms):
187================================
188Total Elapsed Time: ${timeProcessingMutations.toFixed(2)}ms
189Time Waiting for DB Connection: ${timeWaitingForDbConnection.toFixed(2)}ms
190Time Waiting For Lock: ${timeWaitingForLock.toFixed(2)}ms
191Time Getting Client Group: ${timeGettingClientGroup.toFixed(2)}ms
192Time Getting Token Rights: ${timeGettingTokenRights.toFixed(2)}ms
193Time Updating Last Mutations: ${timeUpdatingLastMutations.toFixed(2)}ms
194Time Flushing Context: ${timeFlushingContext.toFixed(2)}ms
195
196Mutation Statistics:
197===================
198Total Mutations Processed: ${mutationTimings.length}
199Total Mutation Execution Time: ${totalMutationTime.toFixed(2)}ms
200Average Mutation Time: ${mutationTimings.length > 0 ? (totalMutationTime / mutationTimings.length).toFixed(2) : "0.00"}ms
201
202Slowest Mutations:
203${mutationTimings
204 .sort((a, b) => b.duration - a.duration)
205 .slice(0, 5)
206 .map((m) => ` ${m.name}: ${m.duration.toFixed(2)}ms`)
207 .join("\n")}
208 `);
209
210 client.release();
211 await supabase.removeChannel(channel);
212 return { result: undefined } as const;
213 }
214 },
215});